package com.jl.rocketmq.producer.demo.producer;

import com.jl.rocketmq.consumer.RocketmqConstant;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.concurrent.TimeUnit;

/**
 * @description: 异步生产者
 * @author: jl
 * @create: 2021/4/5 11:51
 */
public class AsyncProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(RocketmqConstant.producerGroup1);
        producer.setSendMsgTimeout(10000);

        //配置mqnamesrv地址
        producer.setNamesrvAddr(RocketmqConstant.mq_namesrv_cluster);
        //异步发送
//        producer.setRetryTimesWhenSendAsyncFailed();
        //启动生产者
        producer.start();

        for (int i = 0; i < 5; i++) {
            //消息
            Message msg = new Message(RocketmqConstant.topic1,RocketmqConstant.tag_async, ("第"+i+"条异步测试消息").getBytes());
            //异步消息发送
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    MessageQueue messageQueue = sendResult.getMessageQueue();
                    int queueId = messageQueue.getQueueId();
                    String brokerName = messageQueue.getBrokerName();
                    String msgId = sendResult.getMsgId();
                    System.out.println("发送成功： " + sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    //如果发生异常catch异常，尝试重投  或者调整业务逻辑
                    System.out.println("发送异常");
                    e.printStackTrace();
                }
            });
            TimeUnit.SECONDS.sleep(1);
        }
        //异步发送不需要关闭，关闭之后监听就监听不到了
//        producer.shutdown();
        System.out.println("发送完毕");

    }

}
